package com.atuguigu.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws  Exception{
        //获取执行文件上下文
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        env.setParallelism(1);



        //获取数据源
        DataStreamSource<String> stream = env.fromElements("hello word", "hello word");

        // 进行map操作
        // (hello, 1), (world, 1)
        // flatMap: 针对流中的每一个元素生成0个，1个或者多个元素
        SingleOutputStreamOperator<WordWithCount> mapperStream = stream
                .flatMap(new Tokenizer());


        //shuffle操作.分组
        KeyedStream<WordWithCount, String> KeyedStram = mapperStream.keyBy(r -> r.word);

        //reduce
        SingleOutputStreamOperator<WordWithCount> reduceStream = KeyedStram.reduce(new ReduceFunction<WordWithCount>() {
            @Override
            public WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {
                return new WordWithCount(wordWithCount.word, wordWithCount.count + t1.count);
            }
        });

        //打印
        reduceStream.print();
        



        //执行程序
        env.execute();

        
    }


public static class Tokenizer implements FlatMapFunction<String,WordWithCount>{


    @Override
    public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
        String[] words = s.split(" ");
        for(String element:words){
            collector.collect(new WordWithCount(element,1L));
        }
    }
}


public static class WordWithCount{
    public  String word;
    public  Long count;

    public WordWithCount() {
    }

    public WordWithCount(String word, Long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return "WordWithCount{" +
                "word='" + word + '\'' +
                ", count=" + count +
                '}';
    }
}



}


